In [ ]:
import datetime as dt
import urllib2
import ujson as json
from os import environ
%pylab inline
Get the time when this job was started (for debugging purposes).
In [ ]:
starttime = dt.datetime.now()
starttime
Declare the channel to look at.
In [ ]:
channel_to_process = "release"
In [ ]:
sc.defaultParallelism
In [ ]:
# Uncomment the next line and adjust |today_env_str| as necessary to run manually
#today_env_str = "20161105"
today_env_str = environ.get("date", None)
assert (today_env_str is not None), "The date environment parameter is missing."
today = dt.datetime.strptime(today_env_str, "%Y%m%d").date()
# Find the date of last Wednesday to get the proper 7 day range
last_wednesday = today
current_weekday = today.weekday()
if (current_weekday < 2):
last_wednesday -= (dt.timedelta(days=5) + dt.timedelta(days=current_weekday))
if (current_weekday > 2):
last_wednesday -= (dt.timedelta(days=current_weekday) - dt.timedelta(days=2))
min_range = last_wednesday - dt.timedelta(days=17)
report_date_str = last_wednesday.strftime("%Y%m%d")
min_range_str = min_range.strftime("%Y%m%d")
min_range_dash_str = min_range.strftime("%Y-%m-%d")
list([last_wednesday, min_range_str, report_date_str])
The longitudinal dataset can be accessed as a Spark DataFrame, which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python.
In [ ]:
sql_str = "SELECT * FROM longitudinal_v" + today_env_str
frame = sqlContext.sql(sql_str)
sql_str
Restrict the dataframe to the desired channel.
In [ ]:
channel_subset = frame.filter(frame.normalized_channel == channel_to_process)
Restrict the dataframe to the desired data.
In [ ]:
data_subset = channel_subset.select("subsession_start_date",
"subsession_length",
"update_check_code_notify",
"update_check_no_update_notify",
"build.version",
"settings.update.enabled")
Restrict the data to the proper 7 day range, starting at least 17 days before the creation date of the longitudinal dataset.
In [ ]:
def start_date_filter(d):
try:
date = dt.datetime.strptime(d.subsession_start_date[0][:10], "%Y-%m-%d").date()
return min_range <= date
except ValueError:
return False
except TypeError:
return False
date_filtered = data_subset.rdd.filter(start_date_filter)
Analyze the data to determine the number of users on a current version of Firefox vs. a version that's out of date. A "user on a current version" is defined as being either on the latest version as of the beginning of the 7 day range, according to the firefox_history_major_releases.json file on product-details.mozilla.org, or the two versions just prior to it. Versions prior to FF 42 are ignored since unified telemetry was not turned on by default on earlier versions.
In [ ]:
def latest_version_on_date(date, major_releases):
latest_date, latest_version = u"1900-01-01", 0
for version, release_date in major_releases.iteritems():
version_int = int(version.split(".")[0])
if release_date <= date and release_date >= latest_date and version_int >= latest_version:
latest_date = release_date
latest_version = version_int
return latest_version
major_releases_json = urllib2.urlopen("https://product-details.mozilla.org/1.0/firefox_history_major_releases.json").read()
major_releases = json.loads(major_releases_json)
latest_version = latest_version_on_date(min_range_dash_str, major_releases)
def status_mapper(d):
try:
if d.version is None or d.version[0] is None:
return ("none-version", d)
curr_version = int(d.version[0].split(".")[0])
if curr_version < 42:
return ("ignore-version-too-low", d)
if curr_version < latest_version - 2:
# Check if the user ran a particular orphaned version of Firefox for at least 2 hours in
# the last 12 weeks. An orphaned user is running a version of Firefox that's at least 3
# versions behind the current version. This means that an update has been available for
# at least 12 weeks. 2 hours so most systems have had a chance to perform an update
# check, download the update, and restart Firefox after the update has been downloaded.
seconds = 0
curr_version = d.version[0]
index = 0
twelve_weeks_ago = last_wednesday - dt.timedelta(weeks=12)
while seconds < 7200 and index < len(d.version) and d.version[index] == curr_version:
try:
date = dt.datetime.strptime(d.subsession_start_date[index][:10], "%Y-%m-%d").date()
if date < twelve_weeks_ago:
return ("out-of-date-not-run-long-enough", d)
seconds += d.subsession_length[index]
index += 1
except ValueError:
index += 1
except TypeError:
index += 1
if seconds >= 7200:
return ("out-of-date", d)
return ("out-of-date-not-run-long-enough", d)
return ("up-to-date", d)
except ValueError:
return ("value-error", d)
statuses = date_filtered.map(status_mapper).cache()
up_to_date_results = statuses.countByKey()
up_to_date_json_results = json.dumps(up_to_date_results, ensure_ascii=False)
up_to_date_json_results
For people who are out-of-date, determine how many of them have updates disabled:
In [ ]:
out_of_date_statuses = statuses.filter(lambda p: "out-of-date" in p)
def update_disabled_mapper(d):
status, ping = d
if ping is None or ping.enabled is None or ping.enabled[0] is None:
return ("none-update-enabled", ping)
if ping.enabled[0] == True:
return ("update-enabled", ping)
return ("update-disabled", ping)
update_enabled_disabled_statuses = out_of_date_statuses.map(update_disabled_mapper)
update_enabled_disabled_results = update_enabled_disabled_statuses.countByKey()
update_enabled_disabled_json_results = json.dumps(update_enabled_disabled_results, ensure_ascii=False)
update_enabled_disabled_json_results
Focus on orphaned users who have updates enabled.
In [ ]:
update_enabled_statuses = update_enabled_disabled_statuses.filter(lambda p: "update-enabled" in p).cache()
For people who are out-of-date and have updates enabled, determine the distribution across Firefox versions.
In [ ]:
def version_mapper(d):
status, ping = d
return (ping.version[0], ping)
orphaned_by_versions = update_enabled_statuses.map(version_mapper)
orphaned_by_versions_results = orphaned_by_versions.countByKey()
orphaned_by_versions_json_results = json.dumps(orphaned_by_versions_results, ensure_ascii=False)
orphaned_by_versions_json_results
For people who are out-of-date and have updates enabled, determine what the update check returns.
In [ ]:
def update_check_code_notify_mapper(d):
status, ping = d
if ping is None or ping.update_check_code_notify is None:
return -1
for check_code in ping.update_check_code_notify:
counter = -1
for i in check_code:
counter += 1
if i != 0:
return counter
if ping.update_check_no_update_notify is not None and ping.update_check_no_update_notify[0] > 0:
return 0;
return -1
update_check_code_notify_statuses = update_enabled_statuses.map(update_check_code_notify_mapper)
update_check_code_notify_results = update_check_code_notify_statuses.countByValue()
update_check_code_notify_json_results = json.dumps(update_check_code_notify_results, ensure_ascii=False)
update_check_code_notify_json_results
Write results to JSON.
In [ ]:
latest_version_object = {"latest-version": latest_version}
up_to_date_object = {"up-to-date": up_to_date_results}
update_enabled_disabled_object = {"update-enabled-disabled": update_enabled_disabled_results}
update_check_code_notify_object = {"update-check-code-notify": update_check_code_notify_results}
orphaned_by_versions_object = {"orphaned-by-versions": orphaned_by_versions_results}
final_results = [up_to_date_object, update_enabled_disabled_object, update_check_code_notify_object, latest_version_object, orphaned_by_versions_object]
final_results_json = json.dumps(final_results, ensure_ascii=False)
final_results_json
Finally, store the output in the local directory to be uploaded automatically once the job completes. The file will be stored at:
https://analysis-output.telemetry.mozilla.org/SPARKJOBNAME/data/FILENAME
In [ ]:
filename = "./output/" + report_date_str + ".json"
with open(filename, 'w') as f:
f.write(final_results_json)
filename
Get the time when this job ended (for debugging purposes):
In [ ]:
endtime = dt.datetime.now()
endtime
In [ ]:
difference = endtime - starttime
difference
In [ ]: